package com.afly.netty.core.server;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.afly.netty.business.domain.Device;
import com.afly.netty.business.domain.Transaction;
import com.afly.netty.business.mapper.TransactionMapper;
import com.afly.netty.business.service.IDeviceService;
import com.afly.netty.business.service.ITransactionService;
import com.afly.netty.util.CacheUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Scope;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * 自定义netty 拦截器
 */
@Slf4j
@Component
@Scope("prototype")
@AllArgsConstructor
public class CustomerServerHandler extends ChannelInboundHandlerAdapter {

    private final ITransactionService transactionService;

    private final TransactionMapper transactionMapper;

    private final IDeviceService deviceService;

    @Qualifier("taskExecutor")
    private final ThreadPoolTaskExecutor poolTaskExecutor;


    /**
     * 当客户端主动链接服务端的链接后，这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        String channelId = channel.id().toString();
        log.info("链接报告开始");
        log.info("链接报告信息：有一客户端链接到本服务端");
        log.info("链接报告channelId:{}", channelId);
        log.info("链接报告IP:{}", channel.localAddress().getHostString());
        log.info("链接报告Port:{}", channel.localAddress().getPort());
        log.info("链接报告完毕");
        //通知客户端链接建立成功
        String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";
        ctx.writeAndFlush(str);


    }

    /**
     * 当客户端主动断开服务端的链接后，这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端断开链接{}", ctx.channel().localAddress().toString());

        String channelId = ctx.channel().id().toString();
        String deviceNo = CacheUtil.deviceChannelMap.get(channelId);
        if (deviceNo == null) {
            log.info("错误数据跳过");
            return;
        }
        //移除设备信息
        CacheUtil.cacheClientChannel.remove(channelId);
        CacheUtil.deviceChannelMap.remove(channelId);
        CacheUtil.deviceOfflineMap.put(deviceNo, System.currentTimeMillis());
        ctx.channel().close();  //关闭通道
        poolTaskExecutor.execute(() -> {
            Device device = Device.builder().number(deviceNo)
                    .status(0)
                    .lastOnline(LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault()))
                    .build();
            Boolean result = deviceService.updateDeviceStatus(device);
            if (result) {
                log.info("修改设备{}离线成功,时间{}", deviceNo, new Date().toInstant());
            } else {
                log.info("修改设备{}离线失败,时间{}", deviceNo, new Date().toInstant());
            }
            //添加一条设备关机数据
            boolean saveRes = transactionService.save(Transaction.builder()
                    .data("1000")
                    .deviceNo(deviceNo)
                    .fromType("0")
                    .time(LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault()))
                    .build());
            if (!saveRes){
                log.warn("新增设备关机数据失败");
            }

        });
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收msg消息
        log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端接收到消息：" + msg);
        long currentTimeMillis = System.currentTimeMillis();
        //通知客户端链消息发送成功
        String str = "服务端收到：" + new Date() + " " + msg + "\r\n";
        SocketChannel channel = (SocketChannel) ctx.channel();
        String channelId = channel.id().toString();
        //处理发送注册码
        if (msg.toString().charAt(0) != 'D') {
            // 设备上线
            poolTaskExecutor.execute(() -> {
                Device dev = deviceService.getOne(new QueryWrapper<Device>().lambda()
                        .eq(Device::getNumber, msg.toString()));
                if (dev == null){
                    //设备不存在不处理
                    log.info("该设备还未添加或者其他数据不做处理");
                    return;
                }
                CacheUtil.cacheClientChannel.put(channelId, channel);
                CacheUtil.deviceChannelMap.put(channelId, msg.toString());
                Device device = Device.builder().number(msg.toString())
                        .status(1)
                        .lastOnline(LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault()))
                        .build();
                Boolean result = deviceService.updateDeviceStatus(device);
                if (result) {
                    log.info("修改设备{}在线成功,时间{}", msg, new Date().toInstant());
                } else {
                    log.info("修改设备{}在线失败,时间{}", msg, new Date().toInstant());
                }
                //判断当前设备是否是波动下线 是的话进行数据补偿
                if (CacheUtil.deviceOfflineMap.containsKey(msg.toString())) {
                    Long lastTime = CacheUtil.deviceOfflineMap.get(msg.toString());
                    if (currentTimeMillis - lastTime < 1000 * 60 * 15) { //间隔小于15分钟进行数据补偿
                        Transaction one = transactionMapper.selectOne(new QueryWrapper<Transaction>().lambda()
                                .eq(Transaction::getDeviceNo, msg.toString())
                                .eq(Transaction::getFromType, "1")
                                .orderByDesc(Transaction::getId)
                                .last("limit 1")); //获取上一次数据内容
                        long times = (currentTimeMillis - lastTime) / (1000 * 60 * 3);
                        List<Transaction> transactions = new ArrayList<>();
                        for (long i = 0; i < times; i++) {
                            one.setTime(one.getTime().plusMinutes(3 * (i + 1))); //时间增加三分钟
                            transactions.add(one);
                        }
                        if (transactionService.saveBatch(transactions)) {//批量添加
                            //删除缓存中波动设备
                            CacheUtil.deviceOfflineMap.remove(msg.toString());
                        }
                    }
                }

            });

        } else {
            //获取设备号
            String deviceNo = CacheUtil.deviceChannelMap.get(channelId);

            poolTaskExecutor.execute(() -> { //数据保存到数据库
                transactionService.save(Transaction.builder().deviceNo(deviceNo)
                        .data(msg.toString().substring(1))
                        .fromType("1")
                        .time(LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault())).build());
            });
        }

        ctx.writeAndFlush(str);
    }

    /**
     * 抓住异常，当发生异常的时候，可以做一些相应的处理，比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        String channelId = ctx.channel().id().toString();
        String deviceNo = CacheUtil.deviceChannelMap.get(channelId);
        //设备下线
        poolTaskExecutor.execute(()->{
            Device device = Device.builder().number(deviceNo)
                    .status(0)
                    .lastOnline(LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault()))
                    .build();
            Boolean result = deviceService.updateDeviceStatus(device);
            if (result) {
                log.info("修改设备{}离线成功,时间{}", deviceNo, new Date().toInstant());
            } else {
                log.info("修改设备{}离线失败,时间{}", deviceNo, new Date().toInstant());
            }
            //添加一条设备关机数据
            boolean saveRes = transactionService.save(Transaction.builder()
                    .data("1000")
                    .deviceNo(deviceNo)
                    .fromType("1")
                    .time(LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault()))
                    .build());
            if (!saveRes){
                log.warn("新增设备关机数据失败");
            }
        });
        //移除设备信息
        CacheUtil.deviceGroup.remove(channelId);
        CacheUtil.cacheClientChannel.remove(channelId);
        CacheUtil.deviceChannelMap.remove(channelId);
        ctx.close();
        log.info("异常信息：\r\n" + cause.getMessage());
    }

}
